6e75016c45c602c874086dea26324ca413f0c141,external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java,SingleTopicKafkaSpoutTest,shouldEmitAllMessages,#,132
Before Change
@Test
public void shouldEmitAllMessages() throws Exception {
int messageCount = 10;
SpoutContext context = initializeSpout(messageCount);
IntStream.range(0, messageCount).forEach(value -> {
context.spout.nextTuple();
ArgumentCaptor<Object> messageId = ArgumentCaptor.forClass(Object.class);
verify(context.collector).emit(
eq(SingleTopicKafkaSpoutConfiguration.STREAM),
eq(new Values(SingleTopicKafkaSpoutConfiguration.TOPIC,
Integer.toString(value),
Integer.toString(value))),
messageId.capture());
context.spout.ack(messageId.getValue());
reset(context.collector);
});
context.spout.acked.values().forEach(item -> {
assertOffsetCommitted(messageCount - 1, (KafkaSpout.OffsetEntry) item);
});
}
@Test
After Change
public void shouldEmitAllMessages() throws Exception {
try (SimulatedTime simulatedTime = new SimulatedTime()) {
int messageCount = 10;
initializeSpout(messageCount);
//Emit all messages and check that they are emitted. Ack the messages too
IntStream.range(0, messageCount).forEach(value -> {